Spark Streaming之window(窗口操作)

您所在的位置:网站首页 structured streaming window Spark Streaming之window(窗口操作)

Spark Streaming之window(窗口操作)

2024-03-25 05:27| 来源: 网络整理| 查看: 265

Spark Streaming 还提供了窗口的计算,它允许通过滑动窗口对数据进行转换,窗口转换操作如下图 所示: 在这里插入图片描述 在 Spark Streaming 中,数据处理是按批进行的,而数据采集是逐条进行的,因此在 Spark Streaming 中会先设置好批处理间隔,当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。

对于窗口操作而言,在其窗口内部会有 N 个批处理数据,批处理数据的大小由窗口间隔决定,而窗口间隔指的就是窗口的持续时间。

在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数,即滑动间隔,它指的是经过多长时间窗口滑动一次形成新的窗口。

滑动间隔默认情况下和批次间隔相同,而窗口间隔一般设置得要比它们两个大。在这里必须注意的一点是,滑动间隔和窗口间隔的大小一定得设置为批处理间隔的整数倍。

如下图所示,批处理间隔是 1 个时间单位,窗口间隔是 3 个时间单位,滑动间隔是 2 个时间单位。对于初始的窗口(time 1~time 3),只有窗口间隔满足了才会触发数据的处理。

注意: 有可能初始的窗口没有被流入的数据撑满,但是随着时间的推进/窗口最终会被撑满。每过 2 个时间单位,窗口滑动一次,这时会有新的数据流入窗口,窗口则移去最早的 2 个时间单位的数据,而与最新的 2 个时间单位的数据进行汇总形成新的窗口(time 3~ time 5)。

在这里插入图片描述 上图所表达的就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

1. window(windowLength, slideInterval)

该操作由一个DStream对象调用,传入一个窗口长度参数,一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream。

示例: 以长度为3,移动速率为1截取源DStream中的元素形成新的DStream。

val conf = new SparkConf().setMaster("local[2]").setAppName("TestCount") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("localhost",9999) val words = lines.flatMap (_.split(" ")) val windowCounts = words.window(Second(3),Second(1)) windowCounts.print() //通过start()启动消息采集和处理 ssc.start() //启动完成后就不能再做其它操作 //等待计算完成 ssc.awaitTermination()

在这里插入图片描述

基本上每秒输入一个字母,然后取出当前时刻3秒这个长度中的所有元素,打印出来。从上面的截图中可以看到,下一秒时已经看不到a了,再下一秒,已经看不到b和c了。表示a, b, c已经不在当前的窗口中。

2. countByWindow(windowLength,slideInterval)

返回指定长度窗口中的元素个数

示例: 统计当前3秒长度的时间窗口的DStream中元素的个数:

val conf = new SparkConf().setMaster("local[2]").setAppName("TestCount") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3